package com.bw.gmall.realtime.app.dwd;

import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

public class DwdTrafficUserJumpDetail {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String topic="dwd_traffic_page_log";
        String groupId="dwd_traffic_user_jump_detail";
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId);
        DataStreamSource<String> pageLog = env.addSource(kafkaConsumer);
        //转换结构
        SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.flatMap(
                new FlatMapFunction<String, JSONObject>() {
                    @Override
                    public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                        try {
                            JSONObject jsonObj = JSONObject.parseObject(s);
                            collector.collect(jsonObj);
                        } catch (Exception e) {
                            System.out.println("脏数据" + s);
                        }
                    }
                }
        );

        //设置水位线  用于用户跳出统计
        SingleOutputStreamOperator<JSONObject> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<JSONObject>
                                forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<JSONObject>() {
                                    @Override
                                    public long extractTimestamp(JSONObject jsonObject, long l) {
                                        return jsonObject.getLong("ts");
                                    }
                                }
                        )
        );

        //按照mid分组
        KeyedStream<JSONObject, String> keyedStream = withWatermarkStream
                .keyBy(x -> x.getJSONObject("common")
                        .getString("mid"));

        //keyedStream.print("原始数据(水位线和分组):")
        //CEP 匹配规则
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
                new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObj) {
                        return jsonObj.getJSONObject("page").getString("last_page_id")==null;
                    }
                }
        ).next("second").where(
                new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObj) {
                        return jsonObj.getJSONObject("page").getString("last_page_id")==null;
                    }
                }
        ).within(Time.seconds(10l));
//把Pattern 应用到流上
        PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);

//提取匹配上的事件以及超时事件
        OutputTag<JSONObject> timeoutTag = new OutputTag<JSONObject>("timeoutTag") {
        };


        SingleOutputStreamOperator<JSONObject> flatSelectStream = patternStream.flatSelect(
                timeoutTag,
                new PatternFlatTimeoutFunction<JSONObject, JSONObject>() {
                    @Override
                    public void timeout(Map<String, List<JSONObject>> map, long l, Collector<JSONObject> collector) throws Exception {
                        JSONObject element = map.get("first").get(0);
                        collector.collect(element);
                    }
                },
                new PatternFlatSelectFunction<JSONObject, JSONObject>() {
                    @Override
                    public void flatSelect(Map<String, List<JSONObject>> map, Collector<JSONObject> collector) throws Exception {
                        JSONObject element = map.get("first").get(0);
                        collector.collect(element);
                    }
                }

        );

        flatSelectStream.print("主流");
        DataStream<JSONObject> timeOutDStream = flatSelectStream.getSideOutput(timeoutTag);
        timeOutDStream.print("超时流");

        //合并两个流并将数据写出到Kafka
        DataStream<JSONObject> unionDStream = flatSelectStream.union(timeOutDStream);
        unionDStream.print(">>>>>>>>>>>>>>>>>>>>>>>");
        String targetTopic="dwd_traffic_user_jump_detail";
        FlinkKafkaProducer<String> kafkaProducer = MyKafkaUtil.getFlinkKafkaProducer(targetTopic);
        unionDStream.map(JSONAware::toJSONString).addSink(kafkaProducer);
        env.execute();
    }
}
